package com.niit.kafka;


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

//指定偏移量进行消费
public class ConsumerSeekTime {

    public static void main(String[] args) throws InterruptedException {
        //1.配置链接信息
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"BD2_6");

        //2.创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);

        //3.订阅主题
        consumer.subscribe(Arrays.asList("BD2_old_user"));
        //我想获得2023-3-7 16:19以后的数据  -->  2023-3-7 16:19(偏移量)

        //4.1指定位置进行消费    获得当前消费者 所消费的主题下所有分区信息
        Set<TopicPartition> assignment = consumer.assignment();
        //4.2保证分区分配方案指定完成  -- 初始化分区分配信息
        while (assignment.size() == 0){
            consumer.poll(Duration.ofSeconds(1));
            //将初始化的分区分配信息重新给 assignment
            assignment = consumer.assignment();
        }
        //把时间转换为对应的偏移量   某个分区  --> 时间戳
        HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();

        for (TopicPartition topicPartition :assignment) {
                                                            //16542346587487  - 1 *24 * 3600 * 1000
            //将某个分区  和  时间戳 进行保存
            topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 *24 * 3600 * 1000);
        }
        //                                                                    消费者根据上面保存的分区和时间戳获得新Map 分区 和 时间戳偏移量
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionLongHashMap);

        //指定消费offset
        for (TopicPartition topicPartition :assignment){
                                                    //将上面的Map通过分区为键的方式 获得 时间戳偏移量
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            //消费者 根据 时间戳偏移量进行移动
            consumer.seek(topicPartition,offsetAndTimestamp.offset());
        }


        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));

            for (ConsumerRecord<String,String> record:records) {
                 System.out.println(record);
                Thread.sleep(5000);

            }
        }


    }
}
